package onion.mqtt.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/14
 */
public class MqttClient {
    static final Logger log = LoggerFactory.getLogger(MqttClient.class);
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;
    private final MqttClientConfig config;

    public MqttClient(MqttClientConfig config) {
        this.config = config;
        this.eventLoopGroup = new NioEventLoopGroup();
        this.bootstrap = new Bootstrap();
    }

    public MqttClient create() {
        MqttClientChannelInitializer channelInitializer = new MqttClientChannelInitializer();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(channelInitializer);
        return this;
    }

    /**
     * 连接服务器
     */
    public void connect() {
        this.reconnect(config.getHost(), config.getPort());
    }

    /**
     * 订阅主题
     *
     * @param topicName :主题名称
     * @param qos       ：服务端可以向此客户端发送的应用消息的最大QoS等级
     */
    public void subscribe(String topicName, MqttQoS qos) {

    }

    /**
     * 取消订阅主题
     *
     * @param topicName : 主题名称
     */
    public void unsubscribe(String topicName) {

    }

    /**
     * 关闭连接
     */
    public void disConnect() {
        eventLoopGroup.shutdownGracefully();
        log.debug("disconnect success.");
    }

    /**
     * 断线重连，客户端有断线重连机制，就更不能使用异步阻塞了
     *
     * @param host
     * @param port
     */
    private void reconnect(String host, Integer port) {
        bootstrap.remoteAddress(host, port);
        ChannelFuture channelFuture = bootstrap.connect();
        channelFuture.addListener((ChannelFutureListener) future -> {
            if (future.cause() != null) {
                log.debug("connect server: {}, port: {} fail.", host, port);
                future.channel().eventLoop().schedule(() -> reconnect(host, port), 3, TimeUnit.SECONDS);
            } else {
                log.debug("connect server: {}, port: {} success.", host, port);
            }
        });
    }
}
